package org.apache.flink.doris;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * Doris Sink实现 (自定义flink doris sink)
 *
 * @author SuperWein
 */
public class DorisSink extends RichSinkFunction<String> {

    private static final Logger log = LoggerFactory.getLogger(DorisSink.class);

    private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));

    private DorisStreamLoad dorisStreamLoad;

    private String columns;

    private String jsonFormat;

    public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
        this.dorisStreamLoad = dorisStreamLoad;
        this.columns = columns;
        this.jsonFormat = jsonFormat;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }


    /**
     * 判断StreamLoad是否成功
     *
     * @param respContent streamload返回的响应信息（JSON格式）
     * @return
     */
    public static Boolean checkStreamLoadStatus(RespContent respContent) {
        return DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
                && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows();
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        // 截取有效数据
        JSONObject data = JSONObject.parseObject(value);
        value = JSON.toJSONString(data.get("data"));
        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
        if (loadResponse != null && loadResponse.status == 200) {
            RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
            if (!checkStreamLoadStatus(respContent)) {
                log.error("Stream Load fail{}:", loadResponse);
            } else {
                log.info("Stream Load success{}:", loadResponse);
            }
        } else {
            log.error("Stream Load Request failed:{}", loadResponse);
        }
    }

}
